package com.innovation.ic.sc.base.thread.listener.kafka;

import com.alibaba.fastjson.JSONObject;
import com.innovation.ic.b1b.framework.thread.AbstractUnlockThread;
import com.innovation.ic.sc.base.handler.sc.ModelHandler;
import com.innovation.ic.sc.base.mapper.sc.PersonnelManagementMapper;
import com.innovation.ic.sc.base.model.sc.Inventory;
import com.innovation.ic.sc.base.pojo.constant.handler.RabbitMqConstants;
import com.innovation.ic.sc.base.pojo.variable.inventory.InventoryAllSituationPojo;
import com.innovation.ic.sc.base.service.ServiceHelper;
import com.innovation.ic.sc.base.service.sc.InventoryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * @desc   针对kafka的抽象线程类
 * @author linuo
 * @time   2022年10月18日14:03:57
 */
public abstract class AbstractKafkaThread extends AbstractUnlockThread {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /** 操作类型 */
    protected String type;

    /** 数据库变动内容 */
    protected ConsumerRecord<?, ?> consumer;

    /** 操作数据 */
    protected JSONObject dataJsonObject;

    /** model包装类的处理器 */
    protected ModelHandler modelHandler;

    protected ServiceHelper serviceHelper;

    protected InventoryService inventoryService;

    protected PersonnelManagementMapper personnelManagementMapper;

    /**
     * 获取消息体中的数据内容
     * @param bodyString 消息体
     * @return 返回消息体中的数据内容
     */
    protected String getMessageBody(String bodyString) {
        String messageBody = null;
        JSONObject json = (JSONObject) JSONObject.parse(bodyString);
        if (json != null && !json.isEmpty()) {
            messageBody = json.getString(RabbitMqConstants.MESSAGE_BODY);
        }
        return messageBody;
    }

    /**
     * 打印监听到的数据库变动内容
     * @param consumer consumer
     */
    protected String printConsumerContent(ConsumerRecord<?, ?> consumer) {
        logger.info("topic【" + consumer.topic() + "】，key【" + consumer.key() + "】，" +
                "分区位置【" + consumer.partition() + "】，下标【" + consumer.offset() + "】，" +
                "value【" + consumer.value() + "】");
        String value = (String) consumer.value();
        JSONObject valueJSONObject = JSONObject.parseObject(value);
        String zookeeperNode = valueJSONObject.getJSONArray("data").getJSONObject(0).getString("zookeeper_node");
        logger.info("zookeeper_node【{}】", zookeeperNode);
        return zookeeperNode;
    }

    /**
     * 根据库存数据组装库存所有情况的pojo类集合
     * @param inventory 库存数据
     * @return 返回结果
     */
    public List<InventoryAllSituationPojo> getAllSituationPojoListByInventoryData(Inventory inventory){
        List<InventoryAllSituationPojo> list = new ArrayList<InventoryAllSituationPojo>();

        InventoryAllSituationPojo inventoryAllSituationPojo = new InventoryAllSituationPojo();
        inventoryAllSituationPojo.setStatus(inventory.getStatus());
        inventoryAllSituationPojo.setInventoryHome(inventory.getInventoryHome());
        inventoryAllSituationPojo.setUnitPriceType(inventory.getUnitPriceType());
        inventoryAllSituationPojo.setLadderPriceId(inventory.getLadderPriceId());
        inventoryAllSituationPojo.setEnterpriseId(inventory.getEnterpriseId());
        list.add(inventoryAllSituationPojo);

        inventoryAllSituationPojo = new InventoryAllSituationPojo();
        inventoryAllSituationPojo.setStatus(inventory.getStatus());
        inventoryAllSituationPojo.setEnterpriseId(inventory.getEnterpriseId());
        list.add(inventoryAllSituationPojo);

        inventoryAllSituationPojo = new InventoryAllSituationPojo();
        inventoryAllSituationPojo.setInventoryHome(inventory.getInventoryHome());
        inventoryAllSituationPojo.setEnterpriseId(inventory.getEnterpriseId());
        list.add(inventoryAllSituationPojo);

        inventoryAllSituationPojo = new InventoryAllSituationPojo();
        inventoryAllSituationPojo.setUnitPriceType(inventory.getUnitPriceType());
        inventoryAllSituationPojo.setEnterpriseId(inventory.getEnterpriseId());
        list.add(inventoryAllSituationPojo);

        inventoryAllSituationPojo = new InventoryAllSituationPojo();
        inventoryAllSituationPojo.setLadderPriceId(inventory.getLadderPriceId());
        inventoryAllSituationPojo.setEnterpriseId(inventory.getEnterpriseId());
        list.add(inventoryAllSituationPojo);

        inventoryAllSituationPojo = new InventoryAllSituationPojo();
        inventoryAllSituationPojo.setEnterpriseId(inventory.getEnterpriseId());
        list.add(inventoryAllSituationPojo);

        return list;
    }
}